Source code for hysop.tools.misc

# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""Some utilities to deal with workspaces, slices and other python objects

* :class:`~hysop.tools.misc.Utils` : some utilities to handle slices in hysop.

* :class:`~hysop.tools.misc.WorkSpaceTools` to deal with operators' local
  work spaces, see :ref:`work_spaces`.


"""
import inspect
import functools
import operator
import numpy as np

from hysop.constants import HYSOP_REAL, HYSOP_INTEGER
from hysop.tools.numpywrappers import npw


[docs] def getargspec(func): spec = inspect.getfullargspec(func) return (spec.args, spec.varargs, spec.varkw, spec.defaults)
[docs] def prod(values): """ Like sum but for products (of integers). """ try: return np.prod(values, dtype=np.int64) except: return np.prod(values)
[docs] def compute_nbytes(shape, dtype): from hysop.tools.numerics import get_itemsize nbytes = prod(shape) * get_itemsize(dtype) assert nbytes > 0 return nbytes
[docs] def get_default_args(func): """ returns a dictionary of arg_name:default_values for the input function. """ args, varargs, keywords, defaults = getargspec(func) if defaults is None: return dict() else: return dict(zip(args[-len(defaults) :], defaults))
[docs] def get_argnames(func): """ returns arguments name and possible varargs. """ argnames, varargs, _, _ = getargspec(func) return argnames, varargs
[docs] def args2kargs(func, args): argnames, _, _, _ = getargspec(func) return dict(zip(argnames, args))
[docs] def kargs2args(func, kargs, remove=[]): argnames, _, _, _ = getargspec(func) return tuple(kargs[a] for a in argnames if a not in remove)
[docs] def upper_pow2(x): def _upper_pow2(x): if x < 0: raise RuntimeError("x<0") i = 0 k = 2**i while k < x: i += 1 k *= 2 return k if np.isscalar(x): return _upper_pow2(x) elif isinstance(x, np.ndarray): return np.asarray([_upper_pow2(_x) for _x in x], dtype=x.dtype) else: return type(x)(_upper_pow2(_x) for _x in x)
[docs] def next_pow2(x): def _next_pow2(x): if x <= 0: return 1 y = upper_pow2(x) if x == y: y = upper_pow2(x + 1) return y if np.isscalar(x): return _next_pow2(x) elif isinstance(x, np.ndarray): return np.asarray([_next_pow2(_x) for _x in x], dtype=x.dtype) else: return type(x)(_next_pow2(_x) for _x in x)
[docs] def previous_pow2(x): def _previous_pow2(x): assert x >= 1 y = upper_pow2(x) // 2 if x == y: y = upper_pow2(x - 1) // 2 return y if np.isscalar(x): return _previous_pow2(x) elif isinstance(x, np.ndarray): return np.asarray([_previous_pow2(_x) for _x in x], dtype=x.dtype) else: return type(x)(_previous_pow2(_x) for _x in x)
[docs] def upper_pow2_or_3(x): if np.isscalar(x): y = x if x == 3 else upper_pow2(x) else: y = upper_pow2(x) y[x == 3] = 3 return y
[docs] class Utils: """tools to handle array and slices.""" """ Perform an indirect sort of seq using python default sorting algorithm. It returns an array of indices of the same length as input seq. """
[docs] @staticmethod def argsort(seq): return tuple(sorted(range(len(seq)), key=seq.__getitem__))
[docs] @staticmethod def upper_pow2(x): def _upper_pow2(x): if x < 0: raise RuntimeError("x<0") i = 0 k = 2**i while k < x: i += 1 k *= 2 return k if np.isscalar(x): return _upper_pow2(x) else: return np.asarray([_upper_pow2(_x) for _x in x])
[docs] @staticmethod def array_to_dict(inarray): """ convert an array into a dictionnary, keys being the column numbers in array and values the content of each corresponding column transformed into a list of slices like this: column = [1, 4, 2, 6, ...] ---> [slice(1, 4), slice(2, 6), ...] """ outslice = {} size = inarray.shape[1] dimension = (int)(0.5 * inarray.shape[0]) for rk in range(size): outslice[rk] = [ slice(inarray[2 * d, rk], inarray[2 * d + 1, rk] + 1) for d in range(dimension) ] return outslice
[docs] @staticmethod def intersect_slices(sl1, sl2): """Intersection of two lists of slices Parameters ----------- sl1 : a list of slices sl2 : a list of slices Return : guess what ... a list of slices such that: result[i] = intersection between sl1[i] and sl2[i] """ assert len(sl1) == len(sl2) res = [None] * len(sl1) for d in range(len(sl1)): s1 = sl1[d] s2 = sl2[d] if s1.step != s2.step: raise NotImplementedError( "Multi-step intersection has not been implemented yet." ) start = max(s1.start, s2.start) stop = min(s1.stop, s2.stop) step = s1.step if stop <= start: return None res[d] = slice(start, stop, step) return tuple(res)
[docs] @staticmethod def is_on_proc(sl): """True if sl represent a non empty set of indices.""" return (np.asarray([ind.stop != ind.start for ind in sl])).all()
[docs] class WorkSpaceTools: """Tools to deal with internal work arrays for operators"""
[docs] @staticmethod def check_work_array(lwork, subshape, work=None, data_type=HYSOP_REAL): """Check properties of existing working array or allocate some new buffers complient with some properties. Parameters ---------- lwork : int required number of working arrays subshape : list of tuples of int required shape for work array subshape[i] == expected shape for work[i] work : list of numpy arrays data_type : either HYSOP_REAL or HYSOP_INTEGER Notes ----- work arrays are 1D arrays of size prod(subshape) that are 'reshaped' according to the specific needs of each operator. That means that one memory location (the 1D array) may be shared between several operators thanks to the numpy reshape function. """ from hysop.tools.numpywrappers import npw result = [] if isinstance(subshape, list): subsize = [prod(subshape[i]) for i in range(len(subshape))] else: subsize = [ prod(subshape), ] * lwork subshape = [ subshape, ] * lwork if work is None: for i in range(lwork): result.append( npw.zeros(subsize[i], dtype=data_type).reshape(subshape[i]) ) else: assert isinstance(work, list), "rwork must be a list." msg = "Work arrays list must be at least of size " msg += str(lwork) + "." assert len(work) >= lwork, msg msg1 = "Work array size is too small." msg2 = "Work array must be a flat array (1D)." try: for i in range(lwork): wk = work[i] assert wk.size >= subsize[i], msg1 assert len(np.where(np.asarray(wk.shape) > 1)[0]) == 1, msg2 result.append(wk.ravel()[: subsize[i]].reshape(subshape[i])) for i in range(len(result)): assert npw.arrays_share_data(result[i], work[i]) except AttributeError: # Work array has been replaced by an OpenCL Buffer # Testing the buffer size instead of shape for i in range(lwork): wk = work[i] s = wk.size // subsize[i] WorkSpaceTools._check_ocl_buffer(s, data_type) result = work return result
@staticmethod def _check_ocl_buffer(s, dtype): """check if opencl buffer size is complient with input type.""" if dtype is HYSOP_REAL: assert (HYSOP_REAL is np.float32 and s == 4) or ( HYSOP_REAL is np.float64 and s == 8 ) elif dtype is HYSOP_INTEGER: assert ( (HYSOP_INTEGER is np.int16 and s == 2) or (HYSOP_INTEGER is np.int32 and s == 4) or (HYSOP_INTEGER is np.int64 and s == 8) )
[docs] @staticmethod def find_common_workspace(operators, array_type="rwork"): from hysop.tools.numpywrappers import npw """Allocate a list of common workspaces able to work with some given operators Parameters ---------- operators : a list or a dictionnary of operators Each component of the list or value of the dictionnary must be an object that may need some internal workspaces and with a 'get_work_properties' function, hysop operators indeed. array_type : string, optional between 'rwork' (real arrays) or 'iwork' (integer arrays), depending on the required work. Default is 'rwork'. Returns ------- work : list of numpy arrays workspaces common to all operators. Example ------- # op1, op2 some predifined operators op1.discretize() op2.discretize() rwork = Utils.find_common_workspace([op1, op2]) iwork = Utils.find_common_workspace([op1, op2], 'iwork') op1.setup(rwork=work, iwork=iwork) op2.setup(rwork=work) # work is a common internal list of workspaces that can be used by both # operators. """ properties = [] if isinstance(operators, dict): oplist = operators.values() elif isinstance(operators, list): oplist = operators else: raise AttributeError("operators must be a list or a dict") for op in oplist: properties.append(op.get_work_properties()[array_type]) return WorkSpaceTools.allocate_common_workspaces(properties)
[docs] @staticmethod def allocate_common_workspaces(work_properties): """Allocate a list of common workspaces according to some given properties. Parameters ---------- work_properties : list of lists of tuples properties of workspaces. Each component of this list must be the return value of a get_work_properties function (for instance from an operator) Returns ------- work : list of numpy arrays workspaces fitting with properties requirements. Example ------- # op1, op2 some predifined operators op1.discretize() op2.discretize() wk_prop = [] wk_prop.append(op1.get_work_properties()['rwork']) wk_prop.append(op2.get_work_properties()['rwork']) work = Utils.find_common_workspace(wk_prop) op1.setup(rwork=work) op2.setup(rwork=work) # work is a common internal list of workspaces that can be used by both # operators. """ assert isinstance(work_properties, list) # Max number of required working arrays: properties = [p for p in work_properties if p is not None] lwork = max(len(prop) for prop in properties) # Then find the max required workspace for work array shapes = [ (0,), ] * lwork for prop in properties: lp = len(prop) for i in range(lp): shapes[i] = tuple(np.maximum(shapes[i], prod(prop[i]))) work = [npw.zeros(shape) for shape in shapes] return work